package com.data.dev.kafka;

import com.data.dev.key.ConfigurationKey;
import com.data.dev.key.KafkaKey;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;

import java.util.HashMap;

/**
 * @author wangxiaoming
 * 2022年6月17日15:22:33
 * kafka连接构建器
 */
public class KafkaSourceBuilder {

    /**
     *
     * @param topicName:需要解析的主题名称
     * @param GroupId：当前消费者组
     * @return kafkaSource:kafka连接器
     */
    public static KafkaSource<String> getKafkaSource(String topicName,String GroupId){

        final HashMap<String,String> KAFKA_PROPS_MAP = ConfigurationKey.getApplicationProps();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers(KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_BOOTSTRAP_SERVER))
                .setTopics(KAFKA_PROPS_MAP.get(topicName))
                .setGroupId(GroupId)
                .setProperty(KafkaKey.AUTO_OFFSET_RESET, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_AUTO_OFFSET_RESET))
                .setProperty(KafkaKey.SECURITY_PROTOCOL, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_SECURITY_PROTOCOL))
                .setProperty(KafkaKey.SASL_MECHANISM, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_SASL_MECHANISM))
                .setProperty(KafkaKey.SASL_JAAS_CONFIG, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_SASL_JAAS_CONFIG))
                .setProperty(KafkaKey.KEY_DESERIALIZER, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_KEY_DESERIALIZER))
                .setProperty(KafkaKey.VALUE_DESERIALIZER, KAFKA_PROPS_MAP.get(ConfigurationKey.KAFKA_DNS_VALUE_DESERIALIZER))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        return kafkaSource;
    }
}
